package com.example.socket.client;

import com.example.socket.core.*;
import com.example.socket.exception.SocketException;
import com.example.socket.handler.Dispatcher;
import com.example.socket.handler.Processor;
import com.example.socket.handler.SnGenerator;
import com.example.socket.handler.TypeDefinition;
import com.example.socket.thread.DelayedElement;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.FormattingTuple;
import org.slf4j.helpers.MessageFormatter;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;

@SuppressWarnings("rawtypes")
public abstract class AbstractClient<E extends Session> implements Client {

    protected static final Logger logger = LoggerFactory.getLogger(AbstractClient.class);

    /**转发器*/
    protected Dispatcher dispatcher;
    /** 当前与服务器连接的会话 */
    protected volatile Channel channel;
    /** 连接地址 */
    protected InetSocketAddress address;
    /** 尝试连接次数 */
    protected volatile int retry;
    /***/
    protected Bootstrap connector;
    /**客户端工厂*/
    protected ClientFactory factory;
    /**保活*/
    boolean keepAlive = false;
    /**创建时间*/
    volatile long timestamp = System.currentTimeMillis();
    /**默认sessionId*/
    protected volatile long sessionId = -1L;
    /**session id是否发生变化*/
    private volatile boolean sessionChange = true;
    /**session*/
    protected E session;
    /**id生成器*/
    protected SnGenerator generator;
    /** 回调集合 */
    protected ConcurrentHashMap<Integer, SocketFuture> futures;
    /** 过期客户端以及待重发的消息队列 */
    protected DelayQueue<DelayedElement<?>> delays;

    /** 发送代理工厂 */
    protected SendProxyFactory sendProxy;
    /**发送心跳*/
    boolean sendHeartBeat;
    /**类型*/
    int type;

    protected int timeOut = SocketFuture.TIME_OUT;

    @Override
    public void register(Command command, TypeDefinition definition, Processor<?, ?> processor) {
        dispatcher.register(command, definition, processor);
    }

    @Override
    public void registerInterface(Class<?> clz) {
        dispatcher.registerInterface(clz);
    }

    @Override
    public <O> SocketFuture<Response<O>> send(Message message, boolean resend, SocketFuture<Response<O>> future) {
        if (!isConnected()) {
            try {
                connect();
            } catch (SocketException e) {
                if (resend) {
                    Date date = new Date(System.currentTimeMillis() + factory.resendTime);
                    factory.delays.put(MessageResend.valueOf(type, getAddress(), message, date));
                    logger.error("socket重连失败，消息被存储重发");
                    return null;
                }
                throw e;
            }
        }
        try {
            if (sessionChange) {
                createSession();
                sessionChange = false;
            }
            dispatcher.send(message, session);
        } catch (SocketException ex) {
            if (logger.isDebugEnabled()) {
                logger.debug("---请求[{}]移除, 异常---", message.getSn());
            }
            if (future != null) {
                future.onError(ex);
            }
            // 超出重试次数, 回收连接
            if (retry > factory.getMaxRetry()) {
                if (!keepAlive) {
                    // 短连接, 关闭并释放连接
                    close();
                }
            }
            retry++;
        }
        return future;
    }

    @Override
    public <I, O> SocketFuture<Response<O>> send(Request<I> request, Class<O> type, boolean resend) {
        TypeDefinition definition = dispatcher.getDefinition(request.getCommand());
        Type resultType = definition.getResponse();
        if (resultType.equals(type)) {
            return send(request, resend);
        } else if (resultType instanceof ParameterizedType) {
            Type rawType = ((ParameterizedType) resultType).getRawType();
            if (rawType.equals(type)) {
                return send(request, resend);
            }
        }
        FormattingTuple message = MessageFormatter.format("要求的信息体类型[{}]与定义类型[{}]不一致", type, resultType);
        logger.error(message.getMessage());
        throw new SocketException(message.getMessage());
    }

    @Override
    public <I, O> SocketFuture<Response<O>> send(Request<I> request, boolean resend) {
        Message message = dispatcher.encodeRequest(request);
        // 注册回调
        Command command = message.getCommand();
        TypeDefinition td = dispatcher.getDefinition(command);
        int sn = message.getSn();
        SocketFuture<Response<O>> entry = null;
        if (td != null && td.getResponse() != Void.TYPE && sn == 0) {
            sn = generator.next();
            message.setSn(sn);
            entry = new SocketFuture<>(futures, sn, null, this, timeOut);
            futures.put(sn, entry);
        }
        return send(message, resend, entry);
    }

    @Override
    public <I, O> SocketFuture<Response<O>> send(Request<I> request, SocketCallback<Response<O>> callback, boolean resend) {
        Message message = dispatcher.encodeRequest(request);
        // 注册回调
        Command command = message.getCommand();
        TypeDefinition td = dispatcher.getDefinition(command);
        int sn = message.getSn();
        SocketFuture<Response<O>> entry = null;
        if (td.getResponse() != Void.TYPE && sn == 0) {
            sn = generator.next();
            message.setSn(sn);
            entry = new SocketFuture<>(futures, sn, callback, this, timeOut);
            futures.put(sn, entry);
            delays.put(DelayedElement.valueOf(sn, new Date(System.currentTimeMillis() + timeOut)));
        }
        if (logger.isDebugEnabled()) {
            logger.debug("+++消息[{}]请求+++", sn);
        }
        return send(message, resend, entry);
    }

    @Override
    public void send(Message message, boolean resend) {
        send(message, resend, null);
    }

    @Override
    public <T> T getSendProxy(Class<T> clz) {
        return sendProxy.getProxy(clz);
    }

    @Override
    public void close() {
        if (channel == null || !channel.isActive()) {
            // 释放资源
            logger.debug("已经关闭与服务器[{}]的连接", address);
            return;
        }
        logger.debug("准备关闭与服务器[{}]的连接", address);
        ChannelFuture future = (ChannelFuture) channel.close();
        future.awaitUninterruptibly();
        // 释放资源
        channel = null;
        logger.debug("已经关闭与服务器[{}]的连接", address);
    }

    @Override
    public void connect() {
        if (channel != null && channel.isActive()) {
            return;
        }

        logger.debug("开始连接服务器[{}]", address);
        retry++;
        try {
            createChannel();
            logger.debug("与服务器[{}]连接成功,本地连接[{}]", address, channel.localAddress());
            retry = 0;
            // 提交超时任务
            factory.submitRemove(this);
        } catch (Throwable e) {
            if (retry > factory.getMaxRetry()) {
                close();
            }
            throw new SocketException(e);
        }
    }

    @Override
    public boolean isKeepAlive() {
        return keepAlive;
    }

    @Override
    public long getTimestamp() {
        if (keepAlive) {
            return System.currentTimeMillis();
        }
        return this.timestamp;
    }

    @Override
    public boolean isConnected() {
        if (channel == null) {
            return false;
        }
        return channel.isActive();
    }

    @Override
    public synchronized boolean isDisposed() {
        return channel == null;
    }

    @Override
    public boolean isIdle() {
        return futures == null || futures.isEmpty();
    }

    @Override
    public void disableKeepAlive() {
        keepAlive = false;
        // 提交延迟销毁
        factory.submitRemove(this);
    }

    @Override
    public String getAddress() {
        return fromInetSocketAddress(address);
    }

    @Override
    public void setSessionId(long sessionId) {
        if (this.sessionId == sessionId) {
            return;
        }
        this.sessionId = sessionId;
        sessionChange = true;
    }

    @Override
    public boolean isSendHeartBeat() {
        return sendHeartBeat;
    }

    @Override
    public Session getSession() {
        return session;
    }

    /**
     * 建立连接
     * @throws InterruptedException
     */
    protected abstract void createChannel() throws InterruptedException;

    /**
     * 创建session
     */
    protected abstract void createSession();

    /** 连接地址 */
    private String fromInetSocketAddress(InetSocketAddress address) {
        InetAddress inetAddress = address.getAddress();
        return inetAddress == null ? "UNKNOW" : inetAddress.getHostAddress() + ":" + address.getPort();
    }
}
